Airflow-провайдеры

Обзор

В dataCraft Core есть два провайдера для Apache Airflow:

apache_airflow_providers_datacraft_airbyte

Описание

Пакет airflow_providers_datacraft_airbyte позволяет вызывать различные методы API Airbyte как задачи в Airflow. Оператор AirbyteGeneralOperator дает возможность выполнить произвольный вызов API. Остальные операторы вызывают конкретные методы, но при этом в отличие от API дают возможность работать не с ID, а с именами объектов. Например, для создания Airbyte Source требуется указать ID Airbyte Workspace. В операторе вместо этого можно указать его имя. Чтобы преобразовать имя в ID, потребуется передать дополнительный аргумент, который получается с помощью другого оператора (в данном случае AirbyteListWorkspaceOperator. Если имя уникальное, оператор сам подставит в вызов API нужный ID, если нет — вернет ошибку.

Версии API

На момент написания, есть две версии API Airbyte:

По операторах по возможности используется официальное API, те части, которые там еще не реализованы, сделаны с помощью устаревшего API.

Справочник

Провайдер содержит:

  • функции для сбора конфигурации
  • конструктор DAG’ов, позволяющий решить следующие задачи:
    • установить в Airbyte необходимые коннекторы
    • настроить sources, destinations, и connections в Airbyte на основе конфигурации YAML
    • создание модели dbt по умолчанию
    • запускать обновление данных через Airbyte за нужные даты
    • запускать обработку через dbt

Список DAG’ов:

Название Рещаемая задача
create_connections Создаёт Airbyte Connection
install_connectors Устанавливает коннекторы в Airbyte на основе данных из конфига connectors.
generate_models Создает файлы моделей dbt, заполненные содержимым по умолчанию, т.е. вызовом макросов по каждому шагу методологии dataCraft Core
template_configs DAG используется для генерации конфигов типа templated_file
sync_data Синхронизирует соединения в Airbyte и запускает модели dbt

Интеграция с dataCraft

apache_airflow_providers_datacraft_dags

Описание

Все DAG’и проекта dataCraft Core имеют общую структуру для того, чтобы пользователю легче было построить собственную логику на основе имеющихся “строительных блоков”.

Параметры запуска

Во всех DAG’ах есть параметр namespace, который определяет:

  • какие конфиги будут использованы (см. Configs)
  • где располагается папка с проектом dbt.
    Папку проекта dbt можно поменять отдельно с помощью конфига base > dbt_dir.

Шаги работы

  1. Во время парсинга DAG’а с помощью функции get_configs(...) определяются все конфиги, релевантные для него.

Все шаги далее представляют собой задачу или группу задач в Airflow.
2. Шаг before, на котором DAG выполняет подготовительную работу, например, создает необходимые папки (см. документацию по конкретному DAG’у).
3. Шаг prepare, на котором DAG определяет список для итерации. Он определяет содержимое всех задач на следующем этапе, например, соединения Airbyte, для которых нужно включить синхронизацию. Пользователь шага prepare и перед следующим шагом iterate может добавить задачу, которая будет модифицировать этот список.
4. Шаг iterate, на котором на каждый элемент списка для итерации создается динамическая задача или динамическая группа задач.
5. Шаг after, на котором DAG очищает или освобождает выделенные ресурсы, например, удаляет созданные временные папки и файлы.

Справочник

Установка

В среде с установленным Airflow выполнить:

pip install apache_airflow_datacraft_dags_provider

Данная команда установит оба пакета. Если нужен только функционал, связанных с Airbyte, то нужно установить пакет apache_airflow_datacraft_dags_provider

Использование

В Airflow создать DAG со следующим содержимым:

from apache_airflow_providers_datacraft import DagBuilder
DagBuilder.create_dags() # Создаем все DAGи с параметрами по умолчанию

Пример с кастомизацией:

from apache_airflow_providers_datacraft import DagBuilder
dag = DagBuilder.prepare_dag("generate_models")
dag.schedule_interval = "@weekly" # Изменяем расписание запуска на 1 раз в неделю
def delete_normalize(prepared_tasks):
  del prepared_tasks['1_silos']['normalize']

dag.add_prepare_hook(delete_normalize) # Удаляем задачи, связанные со слоем normalize

apache_airflow_providers_datacraft_defaults

Описание

Позволяет получать значения конфигов в тех случаях, когда пользователь не задал никаких значений. Например, при первоначальной настройке Airbyte требуется установить коннекторы. За это отвечает DAG install_connectors, для которого требуется указать пути к образам коннекторов и их документации. Если пользователь этого не сделал, можно взять список коннекторов по умолчанию, который лежит в файле connectors.yaml данного пакета. Его содержимое в виде словаря можно получить с помощью функции get_datacraft_defaults.

Несколько версий конфигов

Для обратной совместимости пакет предусматривает хранение старых версий конфигов. Они имеют такое название, но с суффиксом версии, например connectors_v1.1.yaml. Этот суффикс передается при вызове get_datacraft_defaults.

Шаблонизация

Иногда конфиг по умолчанию зависит от переменной, например, названия проекта. В этом случае в пакете этот конфиг содержится в виде шаблона Jinja, например:

...
entities:
{% if feature_has_metrika %}
  - YandexClientId
{% endif %}
  - AccountId
...

Переменные для подстановки в шаблон передаются как аргумент в get_datacraft_defaults в виде словаря, например {"feature_has_metrika": true}.

Использование

from apache_airflow_providers_datacraft_defaults import get_datacraft_defaults
get_datacraft_defaults('connectors', 'yaml', '')

Функция get_datacraft_defaults принимает три аргумента:

  • config_name — название конфига, значение по умолчанию для которого нужно найти
  • format — формат, json или yaml
  • suffix — суффикс для выбора нужной версии (по умолчанию "")
  • template_variables — словарь с переменными для подстановки в шаблон (по умолчанию пустой).